And code bits:
from pyspark.sql import SQLContext
And where you can check on your local Spark cluster
Great Markdown cheatsheet on github here
In [1]:
# set your working directory if you want less pathy things later
WORK_DIR = '/Users/amcasari/repos/wwconnect-2016-spark4everyone/'
In [2]:
# create an RDD from bikes data
# sc is an existing SparkContext (initialized when PySpark starts)
bikes = sc.textFile(WORK_DIR + "data/bikes/p*")
bikes.count()
Out[2]:
In [3]:
# import SQLContext
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
In [4]:
# since we are familiar with pandas dataframes, let's convert the RDD to a Spark DataFrame
# we'll try to infer the schema from the files
bikes_df = sqlContext.createDataFrame(bikes)
In [5]:
# whoops a daisy, let's remove the header, split out the Rows + we can programmatically specify the schema
names = bikes.first().replace('"','')
names
Out[5]:
In [6]:
# remove the header using subtract
bikesHeader = bikes.filter(lambda x: "instant" in x)
bikesFiltered = bikes.subtract(bikesHeader)
bikesFiltered.count()
Out[6]:
In [7]:
# programmatically specify the schema using a StructField
from pyspark.sql.types import *
fields = [StructField(field_name, StringType(), False) for field_name in names.split(',')]
fields
Out[7]:
In [8]:
schema = StructType(fields)
schema
Out[8]:
In [9]:
# convert each line in the csv to a tuple
parts = bikesFiltered.map(lambda l: l.split(","))
bikesSplit = parts.map(lambda p: (p[0], p[1], p[2], p[3], p[4], p[5], p[6], p[7], p[8], p[9], p[10],
p[11], p[12], p[13], p[14], p[15], p[16]))
In [10]:
# Apply the schema to the RDD.
bikes_df = sqlContext.createDataFrame(bikesSplit, schema)
In [11]:
bikes_df.show()
In [12]:
bikes_df.printSchema()
In [13]:
# now we can look for trends + data quality questions...
# total # of rows in the DataFrame
num_rows = bikes_df.count()
# number of distinct rows in the DataFrame
num_distinct = bikes_df.distinct().count()
# and we can start to see where pySpark returning python objects can be used locally
print "count() returns a python object of type " + str(type(num_rows))
print "number of duplicate rows in the DataFrame: " + str(num_rows - num_distinct)
In [14]:
# check out some more df methods
bikes_df.groupBy('holiday').count().show()
In [15]:
# let's looks at trips in July
july_trips = bikes_df.filter(bikes_df['mnth'] == 7)
# since we'll be working over the DAG quite a bit, let's persist the RDD in memory
july_trips.persist()
Out[15]:
In [16]:
july_trips.count()
Out[16]:
In [17]:
july_trips.show()
In [ ]:
# what else would you examine here?
# more functions can be found here in documentation (listed in refs)
In [18]:
# when we are done working with data, remove from memory
july_trips.unpersist()
Out[18]:
In [19]:
# create an RDD from music lyrics + perform Classic WordCount()
from operator import add
lines = sc.textFile(WORK_DIR + "/data/music/machete - amanda palmer")
counts = lines.flatMap(lambda x: x.split(' ')) \
.map(lambda x: (x, 1)) \
.reduceByKey(add)
output = counts.collect()
for (word, count) in output:
print "%s: %i" % (word, count)